/**
 * Copyright 2015 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 **/

/**
 * Original license:
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
 * License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package io.confluent.common.metrics.stats;

import java.util.ArrayList;
import java.util.List;

import io.confluent.common.metrics.MeasurableStat;
import io.confluent.common.metrics.MetricConfig;

/**
 * A SampledStat records a single scalar value measured over one or more samples. Each sample is
 * recorded over a configurable window. The window can be defined by number of events or ellapsed
 * time (or both, if both are given the window is complete when <i>either</i> the event count or
 * ellapsed time criterion is met). <p> All the samples are combined to produce the measurement.
 * When a window is complete the oldest sample is cleared and recycled to begin recording the next
 * sample.
 *
 * Subclasses of this class define different statistics measured using this basic pattern.
 */
public abstract class SampledStat implements MeasurableStat {

  protected List<Sample> samples;
  private double initialValue;
  private int current = 0;

  public SampledStat(double initialValue) {
    this.initialValue = initialValue;
    this.samples = new ArrayList<Sample>(2);
  }

  @Override
  public void record(MetricConfig config, double value, long timeMs) {
    Sample sample = current(timeMs);
    if (sample.isComplete(timeMs, config)) {
      sample = advance(config, timeMs);
    }
    update(sample, config, value, timeMs);
    sample.eventCount += 1;
  }

  private Sample advance(MetricConfig config, long timeMs) {
    this.current = (this.current + 1) % config.samples();
    if (this.current >= samples.size()) {
      Sample sample = newSample(timeMs);
      this.samples.add(sample);
      return sample;
    } else {
      Sample sample = current(timeMs);
      sample.reset(timeMs);
      return sample;
    }
  }

  protected Sample newSample(long timeMs) {
    return new Sample(this.initialValue, timeMs);
  }

  @Override
  public double measure(MetricConfig config, long now) {
    purgeObsoleteSamples(config, now);
    return combine(this.samples, config, now);
  }

  public Sample current(long timeMs) {
    if (samples.size() == 0) {
      this.samples.add(newSample(timeMs));
    }
    return this.samples.get(this.current);
  }

  public Sample oldest(long now) {
    if (samples.size() == 0) {
      this.samples.add(newSample(now));
    }
    Sample oldest = this.samples.get(0);
    for (int i = 1; i < this.samples.size(); i++) {
      Sample curr = this.samples.get(i);
      if (curr.lastWindowMs < oldest.lastWindowMs) {
        oldest = curr;
      }
    }
    return oldest;
  }

  protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);

  public abstract double combine(List<Sample> samples, MetricConfig config, long now);

  /* Timeout any windows that have expired in the absence of any events */
  protected void purgeObsoleteSamples(MetricConfig config, long now) {
    long expireAge = config.samples() * config.timeWindowMs();
    for (int i = 0; i < samples.size(); i++) {
      Sample sample = this.samples.get(i);
      if (now - sample.lastWindowMs >= expireAge) {
        sample.reset(now);
      }
    }
  }

  protected static class Sample {

    public double initialValue;
    public long eventCount;
    public long lastWindowMs;
    public double value;

    public Sample(double initialValue, long now) {
      this.initialValue = initialValue;
      this.eventCount = 0;
      this.lastWindowMs = now;
      this.value = initialValue;
    }

    public void reset(long now) {
      this.eventCount = 0;
      this.lastWindowMs = now;
      this.value = initialValue;
    }

    public boolean isComplete(long timeMs, MetricConfig config) {
      return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
    }
  }

}
